"""
1. ping

"""





if __name__ == '__main__':
    import sys
    sys.path.append('../../')






import time
import hmac
import hashlib
import json
import requests
from quant.markets import Functions
from quant.markets.functions import get_asset_value
from quant.accounts import Order
from quant.utils import logging, Timer2, catch_exception, LimitDict, Iota
from quant.const import *

from quant.exchanges.util import (spot_value_factor, RecentChecker2, AscendingChecker, update_balance_default, prepare_placing,
                                  prepare_canceling, simulate_deal_spot, Socket)
from quant.exchanges.basics import ChannelBasic, HandlerBasic, ApiBasic, SpiBasic
# from quant.exchanges.gate_util import KucoinSocket, format_symbol, resume_symbol, is_welcome_data, build_req
from quant.exchanges.kucoin_util import get_ws_endpoint, is_welcome


class KucoinSpotChannel(ChannelBasic):
    def init(self):
        self.socket = Socket(None, self.on_open, self.on_message, self.on_close, self.pre_con)

    def pre_con(self, ws):
        while True:
            token = get_ws_endpoint(rest_host)[0]
            if token is not None:
                break
            time.sleep(1)

        host = '{}/endpoint?token={}'.format(ws_host, token)
        ws.url = host

    def subscribe_book(self, ws):
        if self.frequency == DataFrequency.High:
            msg = {
                "id": 1,
                "type": "subscribe",
                "topic": "/market/level2:{}".format(format_symbol(self.symbol)),
                "response": True
            }
        else:
            msg = {
                "id": 1,
                "type": "subscribe",
                "topic": "/spotMarket/level2Depth50:{}".format(format_symbol(self.symbol)),
                "response": True
            }
        msg = json.dumps(msg)
        self.socket.send(msg)

    def subscribe_trade(self, ws):
        msg = {
            "id": 1,
            "type": "subscribe",
            "topic": "/market/match:{}".format(format_symbol(self.symbol)),
            "privateChannel": False,
            "response": True
        }
        msg = json.dumps(msg)
        self.socket.send(msg)

    def subscribe_ticker(self, ws):
        err = 'KucoinSpotChannel.subscribe_ticker() not implemented'
        raise NotImplementedError(err)


class KucoinSpotHandler(HandlerBasic):
    def init(self):
        self.data_id_checker = RecentChecker2(50)
        self.data_id_checker_2 = AscendingChecker()

    def process_book(self, routing_key, recv_time, raw):
        if raw == NAME_CHANNEL_CLOSE:
            return self.process_close(routing_key, recv_time)

        data = json.loads(raw)
        if is_welcome(data):
            return self.info_welcome()

        data = data['data']
        book = self.book

        if self.frequency == DataFrequency.High:
            server_id = data['sequenceEnd']
            server_time = data['time'] / 1000
            self.markets.info_engine.push_process_recv(server_id, server_time, recv_time)

            if not self.data_id_checker.is_new(server_id):
                return

            data = data['changes']
            bids = data['bids']
            asks = data['asks']

            for p, v, _ in bids:
                book['buy', float(p)] = float(v)
            for p, v, _ in asks:
                book['sell', float(p)] = float(v)

        else:
            server_time = data['timestamp']
            server_id = server_time
            self.markets.info_engine.push_process_recv(server_id, server_time, recv_time)

            if not self.data_id_checker_2.is_new(server_id):
                return

            bids = data['bids']
            asks = data['asks']
            bids = [[float(p), float(v)] for p, v in bids]
            asks = [[float(p), float(v)] for p, v in asks]

            book['buy'] = bids
            book['sell'] = asks

        self.check_book(book)
        self.push_book(routing_key, recv_time, server_time, server_id, book)

    def process_trade(self, routing_key, recv_time, raw):
        if raw == NAME_CHANNEL_CLOSE:
            return self.process_close(routing_key, recv_time)

        data = json.loads(raw)
        if is_welcome(data):
            return self.info_welcome()

        data = data['data']
        server_id = data['tradeId']
        server_time = float(data['time'][:13]) / 1000
        self.markets.info_engine.push_process_recv(server_id, server_time, recv_time)

        if not self.data_id_checker.is_new(server_id):
            return

        trade_list = [[
            data['side'],
            float(data['price']),
            float(data['size']),
        ]]

        self.push_trade(routing_key, recv_time, server_time, server_id, trade_list)

    def process_ticker(self, routing_key, recv_time, raw):
        if raw == NAME_CHANNEL_CLOSE:
            return self.process_close(routing_key, recv_time)

        j = json.loads(raw)
        server_id = j['u']
        self.markets.info_engine.push_process_recv(server_id, None, recv_time)

        if not self.data_id_checker_2.is_new(server_id):
            return

        ticker = (
            (float(j['b']), float(j['B'])),
            (float(j['a']), float(j['A'])),
        )
        self.push_ticker(routing_key, recv_time, None, server_id, ticker)


class KucoinSpotApi(ApiBasic):
    def query_balance(self, symbol=None):
        path = 'api/v3/account'
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key)
        self.requesting.request_async(symbol, req, self._on_balance)

    def query_all_balance(self):
        path = 'api/v3/account'
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key)
        self.requesting.request_async(None, req, self._on_all_balance)

    def query_all_position(self):
        return

    def query_all_margin(self):
        return

    def query_open_orders(self, symbol=None):
        path = 'api/v3/openOrders'
        param = {'symbol': format_symbol(symbol, True)}
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key, param)
        self.requesting.request_async(symbol, req, self._on_open_orders)

    def query_all_open_orders(self):
        path = 'api/v3/openOrders'
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key)
        self.requesting.request_async(None, req, self._on_all_open_orders)

    def place_order(self, order):
        return self.executor.place_order(order)

    def cancel_order(self, order):
        return self.executor.cancel_order(order)

    def _on_balance(self, symbol, response):
        if int(response.status_code) // 100 != 2:
            return self._fail_request('query_balance', symbol, response)

        if symbol is None:
            interests = self._asset_added
        else:
            interests = {*symbol.split('/')}

        j = response.json()
        balance = self.account.balance

        for dic in j['balances']:
            asset = dic['asset'].lower()
            if asset in interests:
                update_balance_default(balance, asset, float(dic['free']), float(dic['locked']))

        self._put_data(UserEvent.Balance, balance)

    def _on_all_balance(self, _, response):
        if int(response.status_code) // 100 != 2:
            return self._fail_request('query_all_balance', _, response)

        j = response.json()
        balance = {}

        for dic in j['balances']:
            asset = dic['asset'].lower()
            free = float(dic['free'])
            frozen = float(dic['locked'])
            if free or frozen:
                update_balance_default(balance, asset, free, frozen)

        self.account.balance.clear()
        self.account.balance.update(balance)

        self._put_data(UserEvent.Balance, balance)

    def _on_open_orders(self, symbol, response):
        if int(response.status_code) // 100 != 2:
            return self._fail_request('query_open_orders', symbol, response)

        open_orders = []
        for dic in response.json():
            order = Order(
                symbol=symbol,
                side=dic['side'].lower(),
                price=float(dic['price']),
                amount=abs(float(dic['origQty']) - float(dic['executedQty'])),
                client_id=dic['clientOrderId'],  # 作为template时要注意，有的交易所不会赋值手动单的cid.
                order_id=dic['orderId'],
                status=OrderStatus.Pending,
            )
            open_orders.append(order)

        self.account.order_processor.process_open_orders(open_orders)
        self._put_data(UserEvent.OpenOrders, self.account.orders)

    def _on_all_open_orders(self, _, response):
        if int(response.status_code) // 100 != 2:
            return self._fail_request('query_all_open_orders', None, response)

        open_orders = []
        for dic in response.json():
            order = Order(
                symbol=resume_symbol(dic['symbol']),
                side=dic['side'].lower(),
                price=float(dic['price']),
                amount=abs(float(dic['origQty']) - float(dic['executedQty'])),
                client_id=dic['clientOrderId'],  # 作为template时要注意，有的交易所不会赋值手动单的cid.
                order_id=dic['orderId'],
                status=OrderStatus.Pending,
            )
            open_orders.append(order)

        if open_orders:
            self.account.order_processor.process_open_orders(open_orders)
            self._put_data(UserEvent.OpenOrders, self.account.orders)

    def transfer_swap(self, amount, back_to_spot=False, asset='usdt'):
        type_ = 'MAIN_UMFUTURE'
        if back_to_spot:
            type_ = 'UMFUTURE_MAIN'

        path = 'sapi/v1/asset/transfer'
        param = {
            'type': type_,
            'asset': asset.upper(),
            'amount': amount,
        }
        req = build_request(rest_host, 'POST', path, self.api_key, self.secret_key, param)
        resp = self.requesting.request(req)
        return resp.json()

    def get_trade_fee(self):
        path = 'sapi/v1/asset/tradeFee'
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key)

        resp = self.requesting.request(req)
        data = resp.json()
        for d in data:
            print(d['symbol'], d['makerCommission'], d['takerCommission'])

    def get_recent_deal(self, symbol, limit=10):
        path = 'api/v3/myTrades'

        param = {
            'symbol': format_symbol(symbol, True),
            'limit': limit
        }
        req = build_request(rest_host, 'GET', path, self.api_key, self.secret_key, param)
        resp = self.requesting.request(req)
        return resp.json()


class RestExecutor:
    def __init__(self, api):
        self.api = api

    def place_order(self, order):
        api = self.api
        order = prepare_placing(order, api._create_cid())

        path = 'api/v3/order'
        param = {
            'symbol': format_symbol(order.symbol, True),
            'side': order.side.upper(),
            'price': avoid_sci_num(order.price),
            'quantity': order.amount,
            'type': type_map[order.order_type],
            'newClientOrderId': order.client_id,
        }

        if order.order_type == OrderType.Limit:
            param['timeInForce'] = 'GTC'
        elif order.order_type == OrderType.Market:
            del param['price']

        req = build_request(rest_host, 'POST', path, api.api_key, api.secret_key, param)
        api.account.order_processor.process_place_operation(order)
        api.requesting.request_async(order, req, self._on_place)
        return order.client_id

    def cancel_order(self, order):
        api = self.api
        order = prepare_canceling(order)
        client_id = order.client_id

        if client_id is None:
            return logging.error('cancel order without cid: {}'.format(order))

        path = 'api/v3/order'
        param = {
            'symbol': format_symbol(order.symbol, True),
            'origClientOrderId': client_id,
        }

        req = build_request(rest_host, 'DELETE', path, api.api_key, api.secret_key, param)
        api.account.order_processor.process_cancel_operation(order)
        api.requesting.request_async(order, req, self._on_cancel)

    def _on_place(self, order, response):
        api = self.api
        order = order.copy()

        if int(response.status_code) // 100 != 2:
            api._fail_request('place_order', order, response)
            order.status = OrderStatus.PlaceFailed
        else:
            j = response.json()
            order.order_id = j['orderId']
            order.status = OrderStatus.Pending

            if order.order_type == OrderType.Market:
                order.status = OrderStatus.FullyFilled

        order = api.account.order_processor.update_order(order)
        api._put_data(UserEvent.Order, order)

    def _on_cancel(self, order, response):
        api = self.api
        order = order.copy()

        if int(response.status_code) // 100 != 2:
            api._fail_request('cancel_order', order, response)

        if int(response.status_code) == -1:
            order.status = OrderStatus.CancelFailed
        else:
            order.status = OrderStatus.Canceled

        order = api.account.order_processor.update_order(order)
        api._put_data(UserEvent.Order, order)


class WsExecutor:
    def __init__(self, api):
        self.api = api
        self.socket = Socket(ws_executor_host, self._on_open, self._on_message, lambda x: None)
        self.socket.connect()

        self._msg_head = next_cid_head()
        self._msg_iota = Iota()
        self._placing = {}  # msg_id: (ts, order)
        self._canceling = {}  # msg_id: (ts, order)
        Timer2(self._kill_timeout, 1).start()

    def place_order(self, order):
        api = self.api
        msg_id = '{}{}'.format(self._msg_head, self._msg_iota.next())
        order = prepare_placing(order, api._create_cid())
        self._placing[msg_id] = (time.time(), order)

        param = {
            'symbol': format_symbol(order.symbol, True),
            'side': order.side.upper(),
            'price': avoid_sci_num(order.price),
            'quantity': order.amount,
            'type': type_map[order.order_type],
            'newClientOrderId': order.client_id,
        }

        if order.order_type == OrderType.Limit:
            param['timeInForce'] = 'GTC'
        elif order.order_type == OrderType.Market:
            del param['price']

        param = sign_ws_param(param, self.api.api_key, self.api.secret_key)
        data = {"id": msg_id, "method": "order.place", "params": param}
        msg = json.dumps(data)

        api.account.order_processor.process_place_operation(order)
        self.socket.assert_send(msg)
        return order.client_id

    def cancel_order(self, order):
        api = self.api
        msg_id = '{}{}'.format(self._msg_head, self._msg_iota.next())
        order = prepare_canceling(order)
        self._canceling[msg_id] = (time.time(), order)

        client_id = order.client_id
        if client_id is None:
            return logging.error('cancel order without cid: {}'.format(order))

        params = {
            'symbol': format_symbol(order.symbol, True),
            'origClientOrderId': client_id,
        }
        params = sign_ws_param(params, self.api.api_key, self.api.secret_key)
        data = {"id": msg_id, "method": "order.cancel", "params": params}

        msg = json.dumps(data)
        api.account.order_processor.process_cancel_operation(order)
        self.socket.assert_send(msg)

    def _on_place(self, data):
        id_ = data['id']
        order = self._placing.pop(id_)[1]
        order = order.copy()
        status_code = data['status']
        api = self.api

        if int(status_code) // 100 != 2:
            response = self._build_fail_response(status_code, str(data))
            api._fail_request('place_order', order, response)
            order.status = OrderStatus.PlaceFailed
        else:
            order.order_id = data['result']['orderId']
            order.status = OrderStatus.Pending

            if order.order_type == OrderType.Market:
                order.status = OrderStatus.FullyFilled

        order = api.account.order_processor.update_order(order)
        api._put_data(UserEvent.Order, order)

    def _on_cancel(self, data):
        id_ = data['id']
        order = self._canceling.pop(id_)[1]
        order = order.copy()
        status_code = data['status']
        api = self.api

        if int(status_code) // 100 != 2:
            response = self._build_fail_response(status_code, str(data))
            api._fail_request('cancel_order', order, response)

        order.status = OrderStatus.Canceled
        order = api.account.order_processor.update_order(order)
        api._put_data(UserEvent.Order, order)

    def _on_open(self, ws):
        return

    def _on_message(self, ws, message):
        data = json.loads(message)
        id_ = data['id']
        if id_ in self._placing:
            self._on_place(data)
        elif id_ in self._canceling:
            self._on_cancel(data)
        else:
            logging.warn('KucoinWsExe unknown message: {}'.format(message))

    def _build_fail_response(self, code, text):
        try:
            content = '"{}"'.format(text)
            content = content.encode()
        except Exception as _:
            err = 'Requesting._build_fail_response() err, cannot encode {}'.format(text)
            logging.error(err)
            content = b'{}'

        resp = requests.Response()
        resp._content = content
        resp.status_code = code
        return resp

    def _kill_timeout(self):
        timeout = config.rest_timeout
        too_late = time.time() - timeout
        placing = self._placing
        canceling = self._canceling
        kill_place = [msg_id for msg_id, (ts, order) in placing.items() if ts < too_late]
        kill_cancel = [msg_id for msg_id, (ts, order) in canceling.items() if ts < too_late]

        api = self.api
        for msg_id in kill_place:
            ts, order = placing.pop(msg_id)
            order = order.copy()

            response = self._build_fail_response(-1, 'ws_executor timeout')
            api._fail_request('place_order', order, response)

            order.status = OrderStatus.PlaceFailed
            order = api.account.order_processor.update_order(order)
            api._put_data(UserEvent.Order, order)

        for msg_id in kill_cancel:
            ts, order = canceling.pop(msg_id)
            order = order.copy()

            response = self._build_fail_response(-1, 'ws_executor timeout')
            api._fail_request('cancel_order', order, response)

            order.status = OrderStatus.CancelFailed
            order = api.account.order_processor.update_order(order)
            api._put_data(UserEvent.Order, order)


class KucoinSpotSpi(SpiBasic):
    _listen_key = ''
    _extend_key_started = False

    def add_symbol(self, symbol):
        super().add_symbol(symbol)
        self.connect_once()
        if not self._extend_key_started:
            Timer2(self._extend_listen_key, extend_listen_key_interval).start()
            self._extend_key_started = True

    def _create_socket(self):
        socket = Socket('', self._on_open, self._on_message, self._on_close, self._pre_connect)
        return socket

    def _pre_connect(self, ws):
        listen_key = None
        while listen_key is None:
            try:
                listen_key = get_listen_key(self.api_key, self.secret_key)
                self._listen_key = listen_key
            except:
                catch_exception()
            if listen_key is None:
                time.sleep(1)

        host = '{}/{}'.format(ws_host, listen_key)
        self._socket.set_host(host)

    def _extend_listen_key(self):
        try:
            result = extend_listen_key(self.api_key, self.secret_key, self._listen_key)
        except:
            catch_exception()

    def _on_listen_key_expire(self):  # 似乎用不上。因为没有expire回报
        logging.warn('KucoinSpotSpi listen key expired')
        self._socket.reconnect()

    def _on_message(self, ws, message):
        j = json.loads(message)
        e = j['e']
        if e == 'outboundAccountPosition':
            self._handle_balance(j)
        elif e == 'executionReport':
            self._handle_order(j)
        elif e == 'listenKeyExpired':  # 现货似乎没有expire回报。先留着吧
            self._on_listen_key_expire()
        else:
            logging.warn('KucoinSpotSpi unknown msg:{}'.format(message))

    def _handle_balance(self, data):
        balance = self.account.balance
        for dic in data['B']:
            asset = dic['a'].lower()
            free = float(dic['f'])
            frozen = float(dic['l'])
            update_balance_default(balance, asset, free, frozen)
        self._put_data(UserEvent.Balance, balance)

    def _handle_order(self, data):
        cid = data['C']
        if cid == '':
            cid = data['c']

        symbol = resume_symbol(data['s'])
        if symbol not in self._symbol_added:
            return

        order = Order(
            symbol=symbol,
            side=data['S'].lower(),
            price=float(data['p']),
            amount=float(data['q']) - float(data['z']),
            order_id=data['i'],
            client_id=cid,
            status=status_map[data['X']]
        )
        order = self.account.order_processor.update_order(order)
        self._put_data(UserEvent.Order, order)

        deal = float(data['l'])
        if deal != 0:
            self._put_deal(order, deal)


class KucoinSpotFunctions(Functions):
    def get_all_ticker(self):
        path = '/api/v1/market/allTickers'
        url = rest_host + path

        response = self.session.get(url, timeout=3)
        if int(response.status_code) // 100 != 2:
            return {}
        j = response.json()

        result = {}
        _usdt = get_asset_value('usdt')

        for d in j['data']['ticker']:
            symbol = resume_symbol(d['symbol'])
            if not symbol.endswith('usdt'):
                continue

            if d['last'] is None:
                continue

            result[symbol] = {
                'price': float(d['last']),
                'vol': float(d['volValue']) * _usdt,
                'bid': float(d['buy']),
                'ask': float(d['sell']),
            }
        return result

    def get_recent_trade(self, symbol, _limit=None):
        path = '/api/v1/market/histories'
        url = '{}{}?symbol={}'.format(rest_host, path, format_symbol(symbol))

        response = self.session.get(url, timeout=3)
        if int(response.status_code) // 100 != 2:
            return {}
        j = response.json()

        result = [
            (
                d['sequence'],
                d['side'],
                float(d['price']),
                float(d['size']),
                float(str(d['time'])[:13]) / 1000,
            )
            for d in j['data']
        ]
        return result

    def get_all_precision(self):
        path = '/api/v2/symbols'
        url = rest_host + path
        response = self.session.get(url, timeout=3)
        if int(response.status_code) // 100 != 2:
            return {}

        result = {}
        for d in response.json()['data']:
            symbol = resume_symbol(d['symbol'])
            if not symbol.endswith('/usdt'):
                continue
            price_unit = d['priceIncrement']
            amount_unit = d['baseIncrement']

            result[symbol] = (price_unit, amount_unit)
        return result

    def get_value_factor(self, symbol):
        return spot_value_factor(symbol)

    def get_all_contract_size(self):
        all_ticker = self.get_all_ticker()
        result = {s: 1 for s in all_ticker}
        return result

    def simulate_deal(self, order, deal_amount, mid_price, account):
        return simulate_deal_spot(order, deal_amount, mid_price, account)


def format_symbol(symbol):
    contract = symbol.replace('/', '-')
    return contract.upper()


def resume_symbol(contract):
    symbol = contract.replace('-', '/')
    return symbol.lower()


rest_host = 'https://api.kucoin.com'
# ws_host = 'wss://push1-v2.kucoin.com'
ws_host = 'wss://ws-api-spot.kucoin.com'


if __name__ == '__main__':
    import utils
    from quant.utils import perf_intv
    import keys
    from quant.markets import Markets, functions
    from quant.accounts import Accounts, Order
    from quant.utils import set_test_mode
    from utils import feed_test_raw, print_req
    set_test_mode()

    def try_channel():
        s = 'eth/usdt'
        mar = Markets()
        # mar.add_market(Event.Book, Exchange.Kucoin, s, DataFrequency.High)
        # mar.add_market(Event.Book, Exchange.Kucoin, s, DataFrequency.High)
        mar.add_market(Event.Trade, Exchange.Kucoin, s, DataFrequency.Normal)
        mar.add_market(Event.Trade, Exchange.Kucoin, s, DataFrequency.Normal)
        mar.add_market(Event.Trade, Exchange.Kucoin, s, DataFrequency.Normal)

        # utils.count_raw(mar)
        # utils.print_book(mar)
        # utils.print_raw(mar, cut=50)
        # utils.print_trades(mar)
        # utils.perf_raw_to_data(mar)
        utils.check_raw_to_data_duplicate(mar, False)

        while True:
            input(':')

    def try_api():
        key = keys.get_key('gate_9293')
        account = Accounts().create_account('Kucoin', key)
        api = account.api

        # account.add_symbol('btc/usdt')
        account.info_engine.show_user_data()
        account.info_engine.show_problems()

        # api.query_balance()
        # api.query_all_balance()
        # api.query_open_orders('eth/usdt')
        # api.query_all_open_orders()

        # order = Order('eth/usdt', 'buy', 1, 1)
        # order = Order('doge/usdt', 'buy', 0.05, 21, order_type=OrderType.PostOnly)
        # api.place_order(order)

        api.add_symbol('doge/usdt')
        api.query_open_orders()
        api.query_open_orders()
        api.query_open_orders()
        api.query_open_orders()

        api.query_all_open_orders()
        api.query_all_open_orders()
        api.query_all_open_orders()
        api.query_all_open_orders()
        # input('wait order:')
        # for o in list(account.orders.values()):
        #     input('cancel:')
        #     api.cancel_order(o)

        # spi = KucoinSpotSpi(key)
        # spi.add_symbol('doge/usdt')

        # def on_ud(ud):
        #     print(ud)
        # account.subscribe(UserEvent.Order, on_ud)

        # def on_deal(order, amount):
        #     print('deal amount:', amount)
        # info = spi.account.info_engine
        # info.subscribe(info.Deal, on_deal)

        while True:
            input(':')

    def try_spi():
        key = keys.get_key('gate_9293')
        spi = KucoinSpotSpi(key)
        acc = spi.account
        # acc.info_engine.show_user_data()
        acc.info_engine.show_problems()

        def on_ud(ud):
            print(ud.data)
        acc.subscribe(UserEvent.Order, on_ud)

        acc.info_engine.show_deal()

        spi.connect_once()
        spi.add_symbol('doge/usdt')

        while True:
            print('-----------orders-----------')
            for i in acc.orders.values():
                print(i)
            input(':')

    def try_functions():
        fn = KucoinSpotFunctions()
        # a = fn.get_all_ticker()
        # a = [(s, d) for s, d in a.items()]
        # a.sort(key=lambda x: x[1]['vol'], reverse=True)
        # for i in a:
        #     print(i)

        # a = fn.get_recent_trade('eth/usdt')
        # for i in a:
        #     print(i)
        # print("len:", len(a))

        a = fn.get_all_precision()
        for i in a.items():
            print(i)

    try_channel()


















